{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "binding-delta",
   "metadata": {
    "papermill": {
     "duration": 0.002386,
     "end_time": "2022-10-26T08:37:48.168742",
     "exception": false,
     "start_time": "2022-10-26T08:37:48.166356",
     "status": "completed"
    },
    "tags": []
   },
   "source": [
    "# spark-json-to-parquet"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "63d16770-79ac-4255-a2d0-29412418bebf",
   "metadata": {
    "papermill": {
     "duration": 0.004369,
     "end_time": "2022-10-26T08:37:48.177011",
     "exception": false,
     "start_time": "2022-10-26T08:37:48.172642",
     "status": "completed"
    },
    "tags": []
   },
   "source": [
    "Converts a JSON file to parquet using ApacheSpark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "satellite-vegetation",
   "metadata": {
    "papermill": {
     "duration": 2.57565,
     "end_time": "2022-10-26T08:37:50.756462",
     "exception": false,
     "start_time": "2022-10-26T08:37:48.180812",
     "status": "completed"
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "!pip install pyspark==3.3.1"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "beginning-wisdom",
   "metadata": {
    "papermill": {
     "duration": 0.50325,
     "end_time": "2022-10-26T08:37:51.262358",
     "exception": false,
     "start_time": "2022-10-26T08:37:50.759108",
     "status": "completed"
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "import glob\n",
    "from pyspark import SparkContext, SparkConf\n",
    "from pyspark.sql import SparkSession\n",
    "import os\n",
    "import shutil\n",
    "import sys\n",
    "import logging\n",
    "import re"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "abstract-cambridge",
   "metadata": {
    "papermill": {
     "duration": 0.013453,
     "end_time": "2022-10-26T08:37:51.280521",
     "exception": false,
     "start_time": "2022-10-26T08:37:51.267068",
     "status": "completed"
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "# source path and file name (default: data.csv)\n",
    "data_json = os.environ.get('data_json', 'data.json')\n",
    "\n",
    "# destination path and parquet file name (default: data.parquet)\n",
    "output_data_parquet = os.environ.get('output_data_parquet', 'data.parquet')\n",
    "\n",
    "# url of master (default: local mode)\n",
    "master = os.environ.get('master', \"local[*]\")\n",
    "\n",
    "# temporal data storage for local execution\n",
    "data_dir = os.environ.get('data_dir', '../../data/')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7ff12eff-e0f6-4301-b9a6-8a61caa33435",
   "metadata": {
    "papermill": {
     "duration": 0.014731,
     "end_time": "2022-10-26T08:37:51.299745",
     "exception": false,
     "start_time": "2022-10-26T08:37:51.285014",
     "status": "completed"
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "parameters = list(\n",
    "    map(lambda s: re.sub('$', '\"', s),\n",
    "        map(\n",
    "            lambda s: s.replace('=', '=\"'),\n",
    "            filter(\n",
    "                lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\\/A-Za-z0-9]*', s)),\n",
    "                sys.argv\n",
    "            )\n",
    "    )))\n",
    "\n",
    "for parameter in parameters:\n",
    "    logging.warning('Parameter: ' + parameter)\n",
    "    exec(parameter)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "welsh-grave",
   "metadata": {
    "papermill": {
     "duration": 5.451296,
     "end_time": "2022-10-26T08:37:56.755264",
     "exception": false,
     "start_time": "2022-10-26T08:37:51.303968",
     "status": "completed"
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "sc = SparkContext.getOrCreate(SparkConf().setMaster(master))\n",
    "spark = SparkSession.builder.getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "adjacent-yemen",
   "metadata": {
    "papermill": {
     "duration": 6.452588,
     "end_time": "2022-10-26T08:38:03.211162",
     "exception": false,
     "start_time": "2022-10-26T08:37:56.758574",
     "status": "completed"
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "df = spark.read.json(data_dir + data_json)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1558afed-1c09-4455-b2df-73f3dd8e0e0d",
   "metadata": {
    "papermill": {
     "duration": 0.014632,
     "end_time": "2022-10-26T08:38:03.229094",
     "exception": false,
     "start_time": "2022-10-26T08:38:03.214462",
     "status": "completed"
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "destination = data_dir + output_data_parquet\n",
    "\n",
    "if os.path.isfile(destination):\n",
    "    os.remove(destination)\n",
    "if os.path.isdir(destination):\n",
    "    shutil.rmtree(destination)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "metropolitan-issue",
   "metadata": {
    "papermill": {
     "duration": 3.692036,
     "end_time": "2022-10-26T08:38:06.923997",
     "exception": false,
     "start_time": "2022-10-26T08:38:03.231961",
     "status": "completed"
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "df.coalesce(1).write.parquet(destination)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6fc6da1a-6ade-4b05-a788-22b722acdb7d",
   "metadata": {
    "papermill": {
     "duration": 0.03349,
     "end_time": "2022-10-26T08:38:06.961400",
     "exception": false,
     "start_time": "2022-10-26T08:38:06.927910",
     "status": "completed"
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "# get rid of nasty spark / hadoop folder\n",
    "source = glob.glob(destination+ '/*.parquet')[0]\n",
    "shutil.move(source, data_dir + output_data_parquet + '.tmp')\n",
    "shutil.rmtree(destination) \n",
    "shutil.move(data_dir + output_data_parquet + '.tmp', destination)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.6"
  },
  "papermill": {
   "default_parameters": {},
   "duration": 22.871194,
   "end_time": "2022-10-26T08:38:09.598074",
   "environment_variables": {},
   "exception": null,
   "input_path": "/home/romeokienzler/gitco/claimed/component-library/transform/spark-json-to-parquet.ipynb",
   "output_path": "/home/romeokienzler/gitco/claimed/component-library/transform/spark-json-to-parquet.ipynb",
   "parameters": {},
   "start_time": "2022-10-26T08:37:46.726880",
   "version": "2.3.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
